Skip to content

Conversation

ylwu-amzn
Copy link
Collaborator

@ylwu-amzn ylwu-amzn commented Aug 12, 2025

Description

When build tutorial for Agentic RAG with Bedrock OpenAI OSS model, see the converse API returns different result with Bedrock Claude model.

Build post process function with Painless Script is painful. Propose a new processor framework for processing outputs from ML models and tools using chainable processors
Supports multiple built-in processors:

  1. to_string: Converts input to string format
  2. regex_replace: Performs regex-based text replacement
  3. jsonpath_filter: Filters JSON using JsonPath expressions
  4. extract_json: Extracts JSON objects/arrays from text
  5. regex_capture: Captures text using regex groups
  6. remove_jsonpath: Removes elements at specified JsonPath
  7. conditional: Applies different processors based on conditions

Test Example

Create Agent with ListIndexTool.

Configure two output processors for ListIndexTool

  1. Use regex_replace to remove all lines contains index starting with .plugins
  2. Use regex_replace to remove first column row , since the row number will become wrong after removing lines
POST _plugins/_ml/agents/_register
{
  "name": "Test Agent",
  "type": "flow",
  "description": "This is a demo agent",
  "tools": [
    {
      "type": "ListIndexTool",
      "parameters": {
        "output_processors": [
          {
            "type": "regex_replace",
            "pattern": """(?m)^(?!row,health,status).*?,.*?,.*?,\.plugins[^\n]*\n""",
            "replacement": ""
          },
          {
            "type": "regex_replace",
            "pattern": """(?m)^row,|^\d+,""",
            "replacement": ""
          }
        ]
      }
    }
  ]
}
POST _plugins/_ml/agents/VTgbg5kBxVtFlT1nPnn4/_execute
{
  "parameters": {}
}

Sample resplonse

{
  "inference_results": [
    {
      "output": [
        {
          "name": "response",
          "result": """health,status,index,uuid,pri(number of primary shards),rep(number of replica shards),docs.count(number of available documents),docs.deleted(number of deleted documents),store.size(store size of primary and replica shards),pri.store.size(store size of primary shards)
yellow,open,ss4o_logs-otel-2025.09.22,aePQaQvuSmygcLXMZiZMpA,1,1,218380,0,117.8mb,117.8mb
yellow,open,jaeger-service-2025-09-20,W_c332tRQiaYyYA7Q93a5g,1,1,84,7,21.9kb,21.9kb
yellow,open,jaeger-service-2025-09-22,aG2VFQLMSVWcefCBAgo2iQ,1,1,84,0,50kb,50kb
yellow,open,jaeger-service-2025-09-23,rmO1fmMxQI2ovc_n-uHN2w,1,1,84,0,30.4kb,30.4kb
green,open,top_queries-2025.09.22-00440,lMQ-f2xtS4eLbNA19GdD2A,1,0,1755,0,639.2kb,639.2kb
yellow,open,jaeger-service-2025-09-24,zM41TnvyTEWkp1wSCwgPdQ,1,1,84,0,20kb,20kb
green,open,top_queries-2025.09.25-00443,epX66GgQSEeznqQoyglZFg,1,0,218,47,576.2kb,576.2kb
"""
        }
      ]
    }
  ]
}

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@ylwu-amzn ylwu-amzn had a problem deploying to ml-commons-cicd-env August 12, 2025 00:37 — with GitHub Actions Failure
@ylwu-amzn ylwu-amzn had a problem deploying to ml-commons-cicd-env August 12, 2025 00:37 — with GitHub Actions Failure
@ylwu-amzn ylwu-amzn had a problem deploying to ml-commons-cicd-env August 12, 2025 01:18 — with GitHub Actions Failure
@ylwu-amzn ylwu-amzn had a problem deploying to ml-commons-cicd-env August 12, 2025 01:18 — with GitHub Actions Failure
@ylwu-amzn ylwu-amzn had a problem deploying to ml-commons-cicd-env August 12, 2025 19:42 — with GitHub Actions Failure
@ylwu-amzn ylwu-amzn had a problem deploying to ml-commons-cicd-env August 12, 2025 19:42 — with GitHub Actions Failure
@ylwu-amzn ylwu-amzn temporarily deployed to ml-commons-cicd-env September 19, 2025 09:44 — with GitHub Actions Inactive
Comment on lines +235 to +238
modelTensor = ModelTensor.builder().name(outputKey).dataAsMap((Map) output).build();
} else if (output instanceof List) {
Map<String, Object> resultMap = new HashMap<>();
resultMap.put("output", output);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we put the output in output field here, but above we just return as is if it is a map, so output key may or may not be there? is this expected

previousStepListener = nextStepListener;
}
}
// firstTool.run(firstToolExecuteParams, firstStepListener);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test code?

Comment on lines +275 to +281
if (processedOutput instanceof String) {
connector.parseResponse((String) processedOutput, modelTensors, scriptReturnModelTensor);
} else {
connector.parseResponse(processedOutput, modelTensors, scriptReturnModelTensor);
}
Copy link
Collaborator

@pyek-bot pyek-bot Sep 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this condition seems wrong, if it is a string, you are casting to string else passing the object as is? then why not just pass it in the first place and check inside the parseResponse method

Comment on lines +162 to +166
if (outputParser != null) {
listener.onResponse((T) outputParser.parse(output));
} else {
listener.onResponse((T) output);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listener.onResponse((T) (outputParser != null ? outputParser.parse(output) : output));

return captures.get(0);
}
return captures;
// return String.join(" ", captures); // join results with a space
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test code?

}
}
if (captures.size() == 1) {
return captures.get(0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use getFirst()

Comment on lines +137 to +152
if (condition.startsWith(">") && !condition.startsWith(">=")) {
double threshold = Double.parseDouble(condition.substring(1));
return numValue > threshold;
} else if (condition.startsWith("<") && !condition.startsWith("<=")) {
double threshold = Double.parseDouble(condition.substring(1));
return numValue < threshold;
} else if (condition.startsWith(">=")) {
double threshold = Double.parseDouble(condition.substring(2));
return numValue >= threshold;
} else if (condition.startsWith("<=")) {
double threshold = Double.parseDouble(condition.substring(2));
return numValue <= threshold;
} else if (condition.startsWith("==")) {
double threshold = Double.parseDouble(condition.substring(2));
return Math.abs(numValue - threshold) < 1e-10;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit (readability): we don't need else if as there is a return everywhere

rather:

if { return; }

if { return; }

Comment on lines +213 to +217
if (replaceAll) {
return p.matcher(text).replaceAll(replacement);
} else {
return p.matcher(text).replaceFirst(replacement);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can just return, no need else

if () {return;} return;

Comment on lines +281 to +301
if ("object".equalsIgnoreCase(extractType)) {
if (jsonNode.isObject()) {
return mapper.convertValue(jsonNode, Map.class);
} else {
return defaultValue != null ? defaultValue : input;
}
} else if ("array".equalsIgnoreCase(extractType)) {
if (jsonNode.isArray()) {
return mapper.convertValue(jsonNode, List.class);
} else {
return defaultValue != null ? defaultValue : input;
}
} else { // auto
if (jsonNode.isObject()) {
return mapper.convertValue(jsonNode, Map.class);
} else if (jsonNode.isArray()) {
return mapper.convertValue(jsonNode, List.class);
} else {
return defaultValue != null ? defaultValue : input;
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, let's use early return, really helps with readability, too many if/elseif will cause issues

@pyek-bot
Copy link
Collaborator

Overall looks good, left comments to clean up code. Will test it out more thoroughly.

try {
return Pattern.matches(regex, strValue);
} catch (Exception e) {
log.warn("Invalid regex in condition: {}", regex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we throw an error here instead of failing silently?

}
```

Sampel output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typo

@@ -0,0 +1,543 @@
# 1. Create Model
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we explain what the tutorial is for before jumping into steps?

@@ -0,0 +1,633 @@
# 1. Create Model
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same feedback, could we explain what this tutorial does?

{
"name": "Query DSL Translator Agent",
"type": "flow",
"description": "This is a demo agent for translating NLQ to OpenSearcdh DSL",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typo

}
```

Sampel output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typo

{
"name": "Query DSL Translator Agent",
"type": "flow",
"description": "This is a demo agent for translating NLQ to OpenSearcdh DSL",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typo

@jiapingzeng
Copy link
Contributor

Overall LGTM, have some smaller comments.

@pyek-bot
Copy link
Collaborator

@ylwu-amzn let's separate the tutorial and the PR? Can you raise a separate PR with the tutorial changes?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants